#include <iostream>
#include <thread>
#include <boost/beast/core/detail/base64.hpp>
#include <fstream>
#include "messagehandler.h"
#include "zmq_messenger.h"
#include "loghelper.h"
#include "bytes_buffer.hpp"
#include "http_upload.h"
#include "serialize.hpp"
#include "call_async.hpp"
#include "utility.hpp"

using namespace std;
using namespace RockLog;

void MessageHandler::init(AppConfig_t &cfg)
{
    m_cfg = cfg;
}

void MessageHandler::sendMsg(const std::string &workerId, const CameraInfo_t &info)
{
    std::stringstream ss;
    ss.write((char *)&kCameraInfo_t, 4);
    serialize(ss, info);
    sendMsg(workerId, ss.str());
}

void MessageHandler::sendMsg(const std::string &workerId, const char *msg, uint32_t len)
{
    publishMsg(workerId, msg, len);
}

void MessageHandler::sendMsg(const std::string &workerId, const std::string &str)
{
    publishMsg(workerId, str.c_str(), str.size());
}

void MessageHandler::doWorkerUploadVideoAndTracks(BytesBuffer &buf)
{
    // deserialize string
    std::istringstream is(buf.retrieveAllAsString());
    VideoAndTracks_t t;
    deserialize(is, t.tracksStr);
    deserialize(is, t.videoName);
    deserialize(is, t.videoData);
    int uploadCount = 0;
    int ret = 0;

    std::string host = m_cfg.uploadServerIP;
    std::string port = m_cfg.uploadServerPort;
    std::string url  = m_cfg.uploadServerURL;
    std::vector<std::tuple<std::string, std::string>> vec;
    vec.push_back(std::make_tuple(t.videoName, t.videoData));
    std::string base64_stream = boost::beast::detail::base64_encode(t.tracksStr);
    LOG(kInfo) << "stream size: " << t.tracksStr.size() << ", base64_stream size: " << base64_stream.size();

#ifdef MASTER_DEBUG
    std::ofstream outfile ("test.str");
    outfile.write(base64_stream.c_str(), base64_stream.size());
    outfile.close();
#endif
    do{
        ret = httpUpload(host, port, url, vec, base64_stream);
        if (0 == ret)
        {
            LOG(kInfo) << "upload OK";
            break;
        }
        Utility::usleep(1000000);
        LOG(kInfo) << "try upload count="<<uploadCount+1;
    }while(++uploadCount < 10);

    if(uploadCount == 10)
    {
        LOG(kInfo) << "upload not OK";
    }
}

void MessageHandler::onRecvMsg(const char *workerId, const char *msg, uint32_t len)
{
    LOG(kInfo) << "[MessageHandler::onRecvMsg]";
    BytesBuffer buf;
    uint32_t type;
    char recvId[5] = {0};
    buf.append(msg, len);
    buf.retrieve(recvId, 4);

    if (0 != strcmp(workerId, recvId))
    {
        LOG(kErr) << "invalid workerId:" << recvId;
        return;
    }

    buf.retrieve((char *)&type, 4);
    LOG(kInfo) << "type: " << type;
    switch (type)
    {
    case kJsonMessage:
        LOG(kInfo) << "receive json message";
        break;
    case kStringMessage:
        LOG(kInfo) << "receive string message";
        break;
    case kWorkerUploadVideoAndTracks_t:
        LOG(kInfo) << "receive kWorkerUploadVideoAndTracks_t";
        really_async(&MessageHandler::doWorkerUploadVideoAndTracks,this,buf);
        break;
    default:
        LOG(kInfo) << "MessageHandler::onRecvMsg xxxx";
        break;
    }
}
